In [4]:
%run startup.py

In [5]:
%%javascript
$.getScript('./assets/js/ipython_notebook_toc.js')


A Decision Tree of Observable Operators

Part 3: Transformation

source: http://reactivex.io/documentation/operators.html#tree.
(transcribed to RxPY 1.5.7, Py2.7 / 2016-12, Gunther Klessinger, axiros)

This tree can help you find the ReactiveX Observable operator you’re looking for.
See Part 1 for Usage and Output Instructions.

We also require acquaintance with the marble diagrams feature of RxPy.

Table of Contents

I want emit the items from an Observable after transforming them

... one at a time with a function map / pluck / pluck_attr


In [3]:
reset_start_time(O.map, title='map') # alias is "select"
# warming up:
d = subs(O.from_((1, 2 , 3)).map(lambda x: x * 2))



========== map ==========

function select of module rx.operators.observable.select:
Project each element of an observable sequence into a new form
    by incorporating the element's index.

    1 - source.map(lambda value: value * value)
    2 - source.map(lambda value, index: value * value + index)

    Keyword arguments:
    :param Callable[[Any, Any], Any] mapper: A transform function to
        apply to each source element; the second parameter of the
        function represents the index of the source element.
    :rtype: Observable

    Returns an observable sequence whose elements are the result of
    invoking the transform function on each element of source.
    
--------------------------------------------------------------------------------

   0.6     M New subscription on stream 276083189
   2.1     M [next]    1.5: 2
   2.5     M [next]    1.9: 4
   3.0     M [next]    2.4: 6
   3.3     M [cmpl]    2.7: fin

In [3]:
rst(O.pluck, title='pluck')
d = subs(O.from_([{'x': 1, 'y': 2}, {'x': 3, 'y': 4}]).pluck('y'))

class Coord:
    def __init__(self, x, y): 
        self.x = x
        self.y = y
rst(title='pluck_attr')        
d = subs(O.from_([Coord(1, 2), Coord(3, 4)]).pluck_attr('y'))



========== pluck ==========

function pluck of module rx.operators.observable.pluck:
Retrieves the value of a specified key using dict-like access (as in
    element[key]) from all elements in the Observable sequence.

    Keyword arguments:
    key {String} The key to pluck.

    Returns a new Observable {Observable} sequence of key values.

    To pluck an attribute of each element, use pluck_attr.

    
--------------------------------------------------------------------------------

   1.0     M New subscription on stream 276081749
   1.8     M [next]    0.6: 2
   2.2     M [next]    1.0: 4
   2.5     M [cmpl]    1.3: fin


========== pluck_attr ==========


   0.4     M New subscription on stream 276081825
   0.6     M [next]    0.2: 2
   0.8     M [next]    0.4: 4
   1.0     M [cmpl]    0.6: fin

...by emitting all of the items emitted by corresponding Observables


In [4]:
rst(O.flat_map)
stream = O.range(1, 2)\
           .flat_map(lambda x: O.range(x, 2)) # alias: flat_map
d = subs(stream)


function flat_map of module rx.operators.observable.selectmany:
One of the Following:
    Projects each element of an observable sequence to an observable
    sequence and merges the resulting observable sequences into one
    observable sequence.

    1 - source.flat_map(lambda x: Observable.range(0, x))

    Or:
    Projects each element of an observable sequence to an observable
    sequence, invokes the result mapper for the source element and each
    of the corresponding inner sequence's elements, and merges the results
    into one observable sequence.

    1 - source.flat_map(lambda x: Observable.range(0, x), lambda x, y: x + y)

    Or:
    Projects each element of the source observable sequence to the other
    observable sequence and merges the resulting observable sequences into
    one observable sequence.

    1 - source.flat_map(Observable.from_([1,2,3]))

    Keyword arguments:
    mapper -- A transform function to apply to each element or an
        observable sequence to project each element from the source
        sequence onto.
    result_mapper -- [Optional] A transform function to apply to each
        element of the intermediate sequence.

    Returns an observable sequence whose elements are the result of
    invoking the one-to-many transform function collectionSelector on each
    element of the input sequence and then mapping each of those sequence
    elements and their corresponding source element to a result element.
    
--------------------------------------------------------------------------------

   0.5     M New subscription on stream 275034569
   2.2     M [next]    1.4: 1
   2.9     M [next]    2.1: 2
   3.0     M [next]    2.3: 2
   3.5     M [next]    2.8: 3
   3.9     M [cmpl]    3.1: fin

In [8]:
rst() # from an array
s1 = O.from_(('a', 'b', 'c'))
d  = subs(s1.flat_map(lambda x: x))
d  = subs(s1.flat_map(lambda x, i: (x, i)))
#d = subs(O.from_(('a', 'b', 'c')).flat_map(lambda x, i: '%s%s' % (x, i))) # ident, a string is iterable

header('using a result mapper')

def res_sel(*a):
    # in conrast to the RxJS example I get only 3 parameters, see output
    return '-'.join([str(s) for s in a])

# for every el of the original stream we get *additional* two elements: the el and its index:
d = subs(s1.flat_map(lambda x, i: (x, i)         , res_sel))
# ident, flat_map flattens the inner stream:
d = subs(s1.flat_map(lambda x, i: O.from_((x, i)), res_sel))


   0.4     M New subscription on stream 276591581
   1.6     M [next]    1.0: a
   2.5     M [next]    1.9: b
   3.5     M [next]    2.9: c
   4.0     M [cmpl]    3.5: fin

   4.5     M New subscription on stream 276589117
   5.2     M [next]    0.6: a
   5.6     M [next]    1.0: 0
   5.9     M [next]    1.2: b
   6.5     M [next]    1.8: 1
   7.0     M [next]    2.4: c
   7.9     M [next]    3.3: 2
   8.7     M [cmpl]    4.1: fin


========== using a result mapper ==========


  10.1     M New subscription on stream 276589129
  11.0     M [next]    0.7: a-a-0
  11.8     M [next]    1.5: a-0-1
  12.0     M [next]    1.7: b-b-0
  12.9     M [next]    2.6: b-1-1
  13.1     M [next]    2.9: c-c-0
  14.3     M [next]    4.0: c-2-1
  15.0     M [cmpl]    4.7: fin

  15.9     M New subscription on stream 276582045
  17.2     M [next]    1.0: a-a-0
  18.4     M [next]    2.2: a-0-1
  18.8     M [next]    2.6: b-b-0
  20.0     M [next]    3.8: b-1-1
  20.7     M [next]    4.5: c-c-0
  21.4     M [next]    5.2: c-2-1
  22.0     M [cmpl]    5.8: fin

In [29]:
rst(O.flat_map_latest) # alias: select_switch

d = subs(O.range(1, 2).flat_map_latest(lambda x: O.range(x, 2)))

# maybe better to understand: A, B, C are emitted always more recent, then the inner streams' elements
d = subs(O.from_(('A', 'B', 'C')).flat_map_latest(
        lambda x, i: O.from_(('%s%s-a' % (x, i),
                              '%s%s-b' % (x, i),
                              '%s%s-c' % (x, i),
                             ))))

# with emission delays: Now the inner's is faster:
outer = O.from_marbles('A--B--C|').to_blocking()
inner = O.from_marbles('a-b-c|').to_blocking()
# the inner .map is to show also outer's value
d = subs(outer.flat_map_latest(lambda X: inner.map(lambda x: '%s%s' % (X, x))))


function select_switch of module rx.operators.observable.selectswitch:
Projects each element of an observable sequence into a new sequence
    of observable sequences by incorporating the element's index and then
    transforms an observable sequence of observable sequences into an
    observable sequence producing values only from the most recent
    observable sequence.

    Keyword arguments:
    mapper -- {Function} A transform function to apply to each source
        element; the second parameter of the function represents the index
        of the source element.

    Returns an observable {Observable} sequence whose elements are the
    result of invoking the transform function on each element of source
    producing an Observable of Observable sequences and that at any point in
    time produces the elements of the most recent inner observable sequence
    that has been received.
    
--------------------------------------------------------------------------------

   0.5     M New subscription on stream 276552361
   1.6     M [next]    1.0: 1
   2.4     M [next]    1.8: 2
   2.7     M [next]    2.1: 3
   3.0     M [cmpl]    2.5: fin

   3.8     M New subscription on stream 274776265
   4.5     M [next]    0.5: A0-a
   5.2     M [next]    1.3: B1-a
   5.6     M [next]    1.7: C2-a
   6.0     M [next]    2.1: C2-b
   6.3     M [next]    2.3: C2-c
   6.5     M [cmpl]    2.5: fin

   7.2     M New subscription on stream 276554429
  29.4  T140 [next]   22.1: Aa
 142.3  T142 [next]  135.0: Ab
 240.7  T148 [next]  233.4: Ba
 355.5  T149 [next]  348.2: Bb
 456.5  T156 [next]  449.3: Ca
 570.4  T157 [next]  563.1: Cb
 677.2  T159 [next]  669.9: Cc
 688.4  T161 [cmpl]  681.1: fin

In [4]:
rst(O.for_in)
abc = O.from_marbles('a-b|').to_blocking()

# abc times 3, via:
d = subs(O.for_in([1, 2, 3],
                  lambda i: abc.map(
                      # just to get the results of array and stream:
                      lambda letter: '%s%s' % (letter, i) )))
sleep(0.5)
# we can also for_in from an observable.
# TODO: Dont' understand the output though - __doc__ says only arrays.
d = subs(O.for_in(O.from_((1, 2, 3)),
                  lambda i: abc.map(lambda letter: '%s%s' % (letter, i) )).take(2))


function for_in of module rx.operators.observable.forin:
Concatenates the observable sequences obtained by running the
    specified result mapper for each element in source.

    sources -- {Array} An array of values to turn into an observable
        sequence.
    result_mapper -- {Function} A function to apply to each item in the
        sources array to turn it into an observable sequence.
    Returns an observable {Observable} sequence from the concatenated
    observable sequences.
    
--------------------------------------------------------------------------------

   0.9     M New subscription on stream 276573325
  12.3   T28 [next]   11.2: a1
 122.9   T29 [next]  121.7: b1
 147.8   T34 [next]  146.6: a2
 261.4   T36 [next]  260.2: b2
 284.2   T40 [next]  283.0: a3
 394.4   T41 [next]  393.3: b3
 404.6   T42 [cmpl]  403.4: fin

 505.1     M New subscription on stream 276563949
 517.7   T46 [next]   12.6: a<rx.core.Observable.Observable object at 0x107c21710>
 627.3   T48 [next]  122.2: b<rx.core.Observable.Observable object at 0x107c21710>
 628.0   T48 [cmpl]  122.9: fin

many_select

manySelect internally transforms each item emitted by the source Observable into an Observable that emits that item and all items subsequently emitted by the source Observable, in the same order.

So, for example, it internally transforms an Observable that emits the numbers 1,2,3 into three Observables: one that emits 1,2,3, one that emits 2,3, and one that emits 3.

Then manySelect passes each of these Observables into a function that you provide, and emits, as the emissions from the Observable that manySelect returns, the return values from those function calls.

In this way, each item emitted by the resulting Observable is a function of the corresponding item in the source Observable and all of the items emitted by the source Observable after it.


In [25]:
rst(O.many_select) 
stream = O.from_marbles('a-b-c|')
# TODO: more use cases
d = subs(stream.many_select(lambda x: x.first()).merge_all())


function many_select of module rx.operators.observable.manyselect:
Comonadic bind operator. Internally projects a new observable for each
    value, and it pushes each observable into the user-defined mapper function
    that projects/queries each observable into some result.

    Keyword arguments:
    mapper -- {Function} A transform function to apply to each element.
    scheduler -- {Object} [Optional] Scheduler used to execute the
        operation. If not specified, defaults to the ImmediateScheduler.

    Returns {Observable} An observable sequence which results from the
    comonadic bind operation.
    
--------------------------------------------------------------------------------

   1.0     M New subscription on stream 276604289
  13.0  T196 [next]   11.7: a
 123.5  T197 [next]  122.2: b
 234.1  T200 [next]  232.8: c
 246.1  T202 [cmpl]  244.7: fin

... based on ALL of the items that preceded them scan


In [32]:
rst(O.scan)
s = O.from_marbles("1-2-3-4---5").to_blocking()
d = subs(s.scan(lambda x, y: int(x) + int(y), seed=10000))


function scan of module rx.operators.observable.scan:
Applies an accumulator function over an observable sequence and
    returns each intermediate result. The optional seed value is used as
    the initial accumulator value. For aggregation behavior with no
    intermediate results, see Observable.aggregate.

    1 - scanned = source.scan(lambda acc, x: acc + x)
    2 - scanned = source.scan(lambda acc, x: acc + x, 0)

    Keyword arguments:
    accumulator -- An accumulator function to be invoked on each element.
    seed -- [Optional] The initial accumulator value.

    Returns an observable sequence containing the accumulated values.
    
--------------------------------------------------------------------------------

   0.7     M New subscription on stream 276601945
  14.1  T240 [next]   13.1: 10001
 128.1  T242 [next]  127.0: 10003
 234.7  T243 [next]  233.6: 10006
 345.3  T246 [next]  344.3: 10010
 656.9  T247 [next]  655.8: 10015
 657.6  T249 [cmpl]  656.5: fin

... by attaching a timestamp to them timestamp


In [38]:
rst(O.timestamp)
# the timestamps are objects, not dicts:
d = subs(marble_stream('a-b-c|').timestamp().pluck_attr('timestamp'))


function timestamp of module rx.operators.observable.timestamp:
Records the timestamp for each value in an observable sequence.

    1 - res = source.timestamp() # produces { "value": x, "timestamp": ts }
    2 - res = source.timestamp(Scheduler.timeout)

    :param Scheduler scheduler: [Optional] Scheduler used to compute timestamps. If not
        specified, the timeout scheduler is used.

    Returns an observable sequence with timestamp information on values.
    
--------------------------------------------------------------------------------

   0.7     M New subscription on stream 276594097
  12.1  T276 [next]   11.2: 2016-12-20 20:23:22.236924
 122.7  T277 [next]  121.8: 2016-12-20 20:23:22.347565
 235.4  T279 [next]  234.5: 2016-12-20 20:23:22.460266
 244.0  T281 [cmpl]  243.1: fin

... into an indicator of the amount of time that lapsed before the emission of the item time_interval


In [44]:
rst(O.time_interval)
d = subs(marble_stream('a-b--c|').time_interval().map(lambda x: x.interval))


function time_interval of module rx.operators.observable.timeinterval:
Records the time interval between consecutive values in an
    observable sequence.

    1 - res = source.time_interval();
    2 - res = source.time_interval(Scheduler.timeout)

    Keyword arguments:
    scheduler -- [Optional] Scheduler used to compute time intervals. If
        not specified, the timeout scheduler is used.

    Return An observable sequence with time interval information on values.
    
--------------------------------------------------------------------------------

   0.8     M New subscription on stream 276593965
  12.7  T316 [next]   11.8: 0:00:00.011533
 127.7  T318 [next]  126.8: 0:00:00.115022
 339.1  T319 [next]  338.2: 0:00:00.211367
 344.4  T322 [cmpl]  343.6: fin

In [ ]: